feat(logindex): add RocksDB TableProperties-based LogIndex module#239
feat(logindex): add RocksDB TableProperties-based LogIndex module#239ruojieranyishen wants to merge 6 commits intoarana-db:mainfrom
Conversation
Implements complete logindex module with: - LogIndexAndSequenceCollector for seqno-to-logindex mapping - TablePropertiesCollector for SST metadata (LargestLogIndex/LargestSequenceNumber) - LogIndexOfColumnFamilies for CF state tracking - EventListener for flush completion handling
📝 WalkthroughWalkthroughAdds a new workspace crate Changes
Sequence Diagram(s)sequenceDiagram
participant App as Application
participant Collector as LogIndexAndSequenceCollector
participant RocksDB as RocksDB
participant Props as TablePropertiesCollector
participant EventListener as LogIndexAndSequenceCollectorPurger
participant Tracker as LogIndexOfColumnFamilies
App->>Collector: update(log_index, seqno)
Collector->>Collector: append sample (VecDeque)
App->>RocksDB: write
App->>RocksDB: flush
RocksDB->>Props: build SST (finish)
Props->>Collector: find_applied_log_index(seqno)
Collector-->>Props: log_index
Props->>RocksDB: store user property "<log_index>/<seq>"
RocksDB->>EventListener: on_flush_completed(flush_info)
EventListener->>Collector: find_applied_log_index(largest_seqno)
Collector-->>EventListener: log_index
EventListener->>Tracker: set_flushed_log_index_global(log_index, seqno)
EventListener->>Collector: purge(smallest_applied)
EventListener->>App: snapshot_callback (every N flushes)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 8
🧹 Nitpick comments (6)
src/logindex/src/cf_tracker.rs (1)
185-201:get_pending_flush_gapcan avoidBTreeSetallocation.You only need running min/max here; using a set adds extra allocations and
O(n log n)work.♻️ Proposed simplification
- let mut s: BTreeSet<LogIndex> = BTreeSet::new(); - for i in 0..COLUMN_FAMILY_COUNT { - s.insert(cf[i].applied_index.log_index()); - s.insert(cf[i].flushed_index.log_index()); - } - if s.is_empty() { - return 0; - } - if s.len() == 1 { - return 0; - } - let first = *s.first().unwrap(); - let last = *s.last().unwrap(); - (last - first) as u64 + let mut min_li = i64::MAX; + let mut max_li = i64::MIN; + for i in 0..COLUMN_FAMILY_COUNT { + let a = cf[i].applied_index.log_index(); + let f = cf[i].flushed_index.log_index(); + min_li = min_li.min(a).min(f); + max_li = max_li.max(a).max(f); + } + if min_li == i64::MAX || min_li == max_li { + 0 + } else { + (max_li - min_li) as u64 + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/logindex/src/cf_tracker.rs` around lines 185 - 201, The get_pending_flush_gap function currently builds a BTreeSet to compute the min/max log_index across column families; instead, avoid allocation by iterating cf (from cf.read()) for i in 0..COLUMN_FAMILY_COUNT and tracking running min and max of cf[i].applied_index.log_index() and cf[i].flushed_index.log_index(); if no entries or min==max return 0, otherwise return (max - min) as u64. Update the function to remove the BTreeSet usage and use simple numeric min/max variables while preserving reads of applied_index.log_index() and flushed_index.log_index().src/logindex/Cargo.toml (1)
13-18: Removetempfilefrom runtime dependencies; it's only used in tests.All usages of
tempfilein the logindex crate are confined to#[cfg(test)]blocks (event_listener.rs, cf_tracker.rs, table_properties.rs) and test files. Remove it from[dependencies]on line 13 to keep it exclusively in[dev-dependencies].🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/logindex/Cargo.toml` around lines 13 - 18, Remove the runtime dependency entry for tempfile in the logindex crate: delete the "tempfile.workspace = true" line from the [dependencies] section (leave "log.workspace = true" as-is) and ensure tempfile remains declared under [dev-dependencies] (where "tempfile.workspace = true" already exists) so tempfile is only available for tests; no code changes are necessary because all uses are inside #[cfg(test)] blocks.src/raft/src/rocksdb_integration/table_properties_collector.rs (1)
110-144: Consider extracting shared logic to reduce duplication.
finish()andget_readable_properties()contain nearly identical code for building the properties HashMap. A small helper could eliminate this duplication.Suggested refactor
+ fn build_properties(&self) -> HashMap<Vec<u8>, Vec<u8>> { + let mut properties = HashMap::new(); + let min_log_index = *self.min_log_index.read(); + let min_sequence = *self.min_sequence.read(); + + if let (Some(log_index), Some(sequence)) = (min_log_index, min_sequence) { + let value = format!("{}/{}", log_index, sequence); + properties.insert( + LOG_INDEX_PROPERTY_KEY.as_bytes().to_vec(), + value.as_bytes().to_vec(), + ); + } + properties + } + fn finish(&mut self) -> HashMap<Vec<u8>, Vec<u8>> { - let mut properties = HashMap::new(); - let min_log_index = *self.min_log_index.read(); - let min_sequence = *self.min_sequence.read(); - - if let (Some(log_index), Some(sequence)) = (min_log_index, min_sequence) { - let value = format!("{}/{}", log_index, sequence); - properties.insert( - LOG_INDEX_PROPERTY_KEY.as_bytes().to_vec(), - value.as_bytes().to_vec(), - ); - } - properties + self.build_properties() } fn get_readable_properties(&self) -> HashMap<Vec<u8>, Vec<u8>> { - let mut properties = HashMap::new(); - let min_log_index = *self.min_log_index.read(); - let min_sequence = *self.min_sequence.read(); - - if let (Some(log_index), Some(sequence)) = (min_log_index, min_sequence) { - let value = format!("{}/{}", log_index, sequence); - properties.insert( - LOG_INDEX_PROPERTY_KEY.as_bytes().to_vec(), - value.as_bytes().to_vec(), - ); - } - properties + self.build_properties() }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/raft/src/rocksdb_integration/table_properties_collector.rs` around lines 110 - 144, Extract the duplicated property-building logic from finish() and get_readable_properties() into a single private helper (e.g., build_properties() or format_min_props()) that reads min_log_index and min_sequence, formats the "<log_index>/<sequence>" string, and inserts it under LOG_INDEX_PROPERTY_KEY.as_bytes().to_vec(); then have both finish() and get_readable_properties() call that helper and return its HashMap to remove the repeated code and keep behavior identical.supportlogindex.md (2)
85-87: Add language specifiers to fenced code blocks.Several code blocks lack language specifiers, which reduces readability and prevents proper syntax highlighting. Use
textor an appropriate language identifier.Suggested fix
-``` +```text "LargestLogIndex/LargestSequenceNumber"-
+text
"<log_index>/"-``` +```text "233333/5" // log_index=233333, sequence=5-
+text
src/logindex/ # 新建 crate 或子模块
...-``` +```text LogIndexAndSequencePair LogIndexSeqnoPair ...</details> Also applies to: 90-92, 97-99, 150-163, 253-270 <details> <summary>🤖 Prompt for AI Agents</summary>Verify each finding against the current code and only fix it if needed.
In
@supportlogindex.mdaround lines 85 - 87, Multiple fenced code blocks (e.g.,
the blocks containing "LargestLogIndex/LargestSequenceNumber",
"<log_index>/", "233333/5", the src/logindex/ snippet, and
LogIndexAndSequencePair / LogIndexSeqnoPair) are missing language specifiers;
update each triple-backtick fence to include an appropriate language tag (use
textor a more specific language where applicable) for all occurrences noted
(lines around 85-87, 90-92, 97-99, 150-163, 253-270) so these blocks render with
proper syntax highlighting and readability.</details> --- `8-11`: **Replace local filesystem paths with relative or placeholder paths.** The document contains absolute paths from a specific developer's machine (e.g., `/home/lpf/dev/kiwi`). These are not portable and may confuse other contributors. Consider using relative paths, placeholders like `<your-local-path>`, or removing them entirely if they're only relevant during initial development. Also applies to: 45-45, 105-106, 276-279 <details> <summary>🤖 Prompt for AI Agents</summary>Verify each finding against the current code and only fix it if needed.
In
@supportlogindex.mdaround lines 8 - 11, Replace the hardcoded absolute
developer paths in supportlogindex.md (e.g.,/home/lpf/dev/kiwiand
/home/lpf/dev/kiwi-cpp) with relative paths or placeholders such as
<your-local-path>or./kiwi/./kiwi-cpp; update the referenced file
locations (e.g.,src/storage/src/log_index.h,log_index.cc) to use those
placeholders or relative references so the document is portable and remove any
machine-specific traces throughout the file (also apply same change to the other
occurrences noted in the review).</details> </blockquote></details> <details> <summary>src/logindex/tests/logindex_integration.rs (1)</summary><blockquote> `110-115`: **Consider returning an error for out-of-bounds `cf_id` instead of falling back.** When `cf_id >= CF_NAMES.len()`, returning `get_properties_of_all_tables()` masks invalid inputs and could hide bugs in callers. Returning an error would make test failures more explicit. <details> <summary>Suggested fix</summary> ```diff fn get_properties_of_all_tables_cf( &self, cf_id: usize, ) -> std::result::Result< rocksdb::table_properties::TablePropertiesCollection, rocksdb::Error, > { if cf_id < CF_NAMES.len() { let cf = self.db.cf_handle(CF_NAMES[cf_id]).expect("cf handle"); self.db.get_properties_of_all_tables_cf(&cf) } else { - self.db.get_properties_of_all_tables() + Err(rocksdb::Error::new(format!("Invalid cf_id: {}", cf_id))) } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/logindex/tests/logindex_integration.rs` around lines 110 - 115, The current branch silently falls back to self.db.get_properties_of_all_tables() when cf_id is out-of-bounds, which masks invalid inputs; change the cf_id check (using CF_NAMES and cf_id) to return an Err with a clear message (e.g., "invalid column family id") instead of calling get_properties_of_all_tables(), and only call self.db.get_properties_of_all_tables_cf(&cf) after successfully obtaining cf via self.db.cf_handle(CF_NAMES[cf_id]); ensure the function's return type remains a Result so callers/tests fail explicitly on invalid cf_id.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@Cargo.toml`:
- Line 46: Replace the floating branch reference for the rocksdb dependency in
Cargo.toml with an explicit immutable commit: change the rocksdb dependency that
currently uses branch = "addtableproperties" to use rev =
"f7abb18c64fac810f3c4736aef833c340396449b" so the manifest itself pins the crate
to that exact commit while keeping package = "rust-rocksdb", git URL, and
features (e.g., "multi-threaded-cf") unchanged.
In `@src/logindex/src/cf_tracker.rs`:
- Around line 134-177: Each public method that indexes cf
(set_flushed_log_index, set_flushed_log_index_global, is_applied, update) must
validate cf_id before doing cf[cf_id] to avoid panics; add a bounds check at the
top of each method (e.g., if cf_id >= COLUMN_FAMILY_COUNT { return; } for void
methods or return false for is_applied) or compare against self.cf.read().len()
if dynamic, and only proceed to read/write cf[] after the check so invalid cf_id
is handled safely instead of panicking.
In `@src/logindex/src/collector.rs`:
- Line 17: Run rustfmt (cargo fmt) on the module src/logindex/src/collector.rs
and apply the formatting changes to satisfy rustfmt (fix the formatting diffs
reported around line 17 and line 80). Update the file so it conforms to rustfmt
rules and commit the formatted file; ensure no logic is changed—only
whitespace/formatting adjustments in collector.rs.
- Around line 45-50: In LogCollector::new (the new function handling
step_length_bit) guard against shifts >=64: replace the current calculation of
step_length_mask with logic that treats step_length_bit==0 => 0, step_length_bit
>= 64 => u64::MAX, otherwise compute (1u64 << step_length_bit) - 1; reference
the step_length_bit parameter and step_length_mask variable so the change
prevents shift overflow/panic in debug builds and wrapping in release builds.
In `@src/logindex/src/event_listener.rs`:
- Around line 119-131: The code sets manual_flushing_cf unconditionally which
can claim manual-flush state even when flush_trigger is None; modify the logic
in the block around manual_flushing_cf/flush_trigger so you first check whether
self.flush_trigger.is_some() (or obtain the trigger via if let Some(trigger) =
self.flush_trigger) and only perform the compare_exchange on manual_flushing_cf
when a trigger exists; alternatively, if you must set it first, immediately roll
back (reset manual_flushing_cf) when flush_trigger is None—ensure this change
references the existing symbols manual_flushing_cf, flush_trigger,
compare_exchange, trigger, and COLUMN_FAMILY_COUNT and preserves the existing
early return behavior when compare_exchange fails.
In `@src/logindex/src/lib.rs`:
- Around line 36-47: COLUMN_FAMILY_COUNT and CF_NAMES can drift from
storage::ColumnFamilyIndex; add a compile-time assertion to ensure
CF_NAMES.len() == COLUMN_FAMILY_COUNT (use a const _: () = assert!(...)) and add
a unit test that iterates CF_NAMES, converts the index to
storage::ColumnFamilyIndex (e.g., with unsafe transmute from usize to
ColumnFamilyIndex) and asserts cf_index.name() == CF_NAMES[i] to catch
name/order mismatches; reference COLUMN_FAMILY_COUNT, CF_NAMES, and
storage::ColumnFamilyIndex::name() when adding these checks.
In `@src/logindex/src/table_properties.rs`:
- Around line 140-143: The parser currently accepts values with extra '/'
segments by ignoring trailing parts; update the parsing in the block that
creates LogIndexAndSequencePair (where s, parts, LogIndex, SequenceNumber and
LogIndexAndSequencePair::new are used) to require exactly two segments: parse
parts.next() for log_index, parts.next() for seqno, and then ensure parts.next()
returns None (reject if more segments exist); return None on any parse failure
so malformed values like "a/b/c" are rejected.
- Around line 156-159: The current update only when pair.applied_log_index() >
max_log_index loses ties; change the update condition for
max_log_index/max_seqno so it also updates when pair.applied_log_index() ==
max_log_index but pair.seqno() is greater: i.e., in the block referencing
pair.applied_log_index(), max_log_index, max_seqno and pair.seqno(), replace the
strict greater check with a compound condition that updates when
applied_log_index is greater OR when equal and seqno is larger, then assign both
max_log_index and max_seqno inside that branch.
---
Nitpick comments:
In `@src/logindex/Cargo.toml`:
- Around line 13-18: Remove the runtime dependency entry for tempfile in the
logindex crate: delete the "tempfile.workspace = true" line from the
[dependencies] section (leave "log.workspace = true" as-is) and ensure tempfile
remains declared under [dev-dependencies] (where "tempfile.workspace = true"
already exists) so tempfile is only available for tests; no code changes are
necessary because all uses are inside #[cfg(test)] blocks.
In `@src/logindex/src/cf_tracker.rs`:
- Around line 185-201: The get_pending_flush_gap function currently builds a
BTreeSet to compute the min/max log_index across column families; instead, avoid
allocation by iterating cf (from cf.read()) for i in 0..COLUMN_FAMILY_COUNT and
tracking running min and max of cf[i].applied_index.log_index() and
cf[i].flushed_index.log_index(); if no entries or min==max return 0, otherwise
return (max - min) as u64. Update the function to remove the BTreeSet usage and
use simple numeric min/max variables while preserving reads of
applied_index.log_index() and flushed_index.log_index().
In `@src/logindex/tests/logindex_integration.rs`:
- Around line 110-115: The current branch silently falls back to
self.db.get_properties_of_all_tables() when cf_id is out-of-bounds, which masks
invalid inputs; change the cf_id check (using CF_NAMES and cf_id) to return an
Err with a clear message (e.g., "invalid column family id") instead of calling
get_properties_of_all_tables(), and only call
self.db.get_properties_of_all_tables_cf(&cf) after successfully obtaining cf via
self.db.cf_handle(CF_NAMES[cf_id]); ensure the function's return type remains a
Result so callers/tests fail explicitly on invalid cf_id.
In `@src/raft/src/rocksdb_integration/table_properties_collector.rs`:
- Around line 110-144: Extract the duplicated property-building logic from
finish() and get_readable_properties() into a single private helper (e.g.,
build_properties() or format_min_props()) that reads min_log_index and
min_sequence, formats the "<log_index>/<sequence>" string, and inserts it under
LOG_INDEX_PROPERTY_KEY.as_bytes().to_vec(); then have both finish() and
get_readable_properties() call that helper and return its HashMap to remove the
repeated code and keep behavior identical.
In `@supportlogindex.md`:
- Around line 85-87: Multiple fenced code blocks (e.g., the blocks containing
"LargestLogIndex/LargestSequenceNumber", "<log_index>/<sequence>", "233333/5",
the src/logindex/ snippet, and LogIndexAndSequencePair / LogIndexSeqnoPair) are
missing language specifiers; update each triple-backtick fence to include an
appropriate language tag (use `text` or a more specific language where
applicable) for all occurrences noted (lines around 85-87, 90-92, 97-99,
150-163, 253-270) so these blocks render with proper syntax highlighting and
readability.
- Around line 8-11: Replace the hardcoded absolute developer paths in
supportlogindex.md (e.g., `/home/lpf/dev/kiwi` and `/home/lpf/dev/kiwi-cpp`)
with relative paths or placeholders such as `<your-local-path>` or
`./kiwi`/`./kiwi-cpp`; update the referenced file locations (e.g.,
`src/storage/src/log_index.h`, `log_index.cc`) to use those placeholders or
relative references so the document is portable and remove any machine-specific
traces throughout the file (also apply same change to the other occurrences
noted in the review).
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (12)
Cargo.tomlsrc/logindex/Cargo.tomlsrc/logindex/src/cf_tracker.rssrc/logindex/src/collector.rssrc/logindex/src/db_access.rssrc/logindex/src/event_listener.rssrc/logindex/src/lib.rssrc/logindex/src/table_properties.rssrc/logindex/src/types.rssrc/logindex/tests/logindex_integration.rssrc/raft/src/rocksdb_integration/table_properties_collector.rssupportlogindex.md
d1a9c2c to
c79af76
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (5)
src/logindex/src/event_listener.rs (1)
115-127:⚠️ Potential issue | 🟠 MajorDon’t claim manual-flush state when no trigger is configured.
If
flush_triggerisNone, Line 117 can still setmanual_flushing_cf, and nothing clears it, which can block later manual flush scheduling.🔧 Proposed fix
- if self - .manual_flushing_cf - .compare_exchange(-1, target_cf as i64, Ordering::SeqCst, Ordering::SeqCst) - .is_err() - { - return; - } - - if let Some(ref trigger) = self.flush_trigger { - if target_cf < COLUMN_FAMILY_COUNT { - trigger(target_cf); - } - } + let Some(ref trigger) = self.flush_trigger else { + return; + }; + + if self + .manual_flushing_cf + .compare_exchange(-1, target_cf as i64, Ordering::SeqCst, Ordering::SeqCst) + .is_err() + { + return; + } + + if target_cf < COLUMN_FAMILY_COUNT { + trigger(target_cf); + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/logindex/src/event_listener.rs` around lines 115 - 127, The code currently sets manual_flushing_cf via compare_exchange even when flush_trigger is None, leaving the state claimed and blocking future manual flushes; change the logic to only claim manual_flushing_cf when a valid flush_trigger exists (and target_cf < COLUMN_FAMILY_COUNT) — i.e., check self.flush_trigger.is_some() (and the target_cf bound) before calling compare_exchange, or if you must set first then ensure you immediately revert/reset manual_flushing_cf to -1 when flush_trigger is None or target_cf is out of range; refer to manual_flushing_cf, flush_trigger, compare_exchange, trigger, target_cf and COLUMN_FAMILY_COUNT to locate and fix the code.src/logindex/src/collector.rs (1)
39-44:⚠️ Potential issue | 🟠 MajorGuard
step_length_bitbefore shifting.Line 43 can overflow/panic (or wrap) when
step_length_bit >= 64. Please clamp this branch explicitly.🛠️ Proposed fix
- let step_length_mask = if step_length_bit == 0 { - 0 - } else { - (1u64 << step_length_bit) - 1 - }; + let step_length_mask = match step_length_bit { + 0 => 0, + 1..=63 => (1u64 << step_length_bit) - 1, + _ => u64::MAX, + };In Rust stable, what is the behavior of `1u64 << n` when `n >= 64` in debug vs release builds, and which API (`checked_shl` / `wrapping_shl`) is recommended for explicit behavior?🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/logindex/src/collector.rs` around lines 39 - 44, The calculation of step_length_mask in Collector::new uses (1u64 << step_length_bit) which is undefined in debug and wraps in release when step_length_bit >= 64; clamp or bound step_length_bit before shifting (e.g., cap to 63 via step_length_bit = step_length_bit.min(63) or use step_length_bit.clamp(0,63)) or use u64::checked_shl and handle None to avoid the overflow; update the new function (referencing step_length_bit and step_length_mask) to enforce this bound so the mask computation is safe and explicit.src/logindex/src/table_properties.rs (2)
149-153:⚠️ Potential issue | 🟠 MajorHandle
log_indexties by selecting the higherseqno.The current strict-
>condition can keep an older sequence number when multiple SSTs have the samelog_index.🔧 Proposed fix
- if let Some(pair) = read_stats_from_table_props(&user_props) { - if pair.applied_log_index() > max_log_index { - max_log_index = pair.applied_log_index(); - max_seqno = pair.seqno(); - } - } + if let Some(pair) = read_stats_from_table_props(&user_props) { + if pair.applied_log_index() > max_log_index + || (pair.applied_log_index() == max_log_index && pair.seqno() > max_seqno) + { + max_log_index = pair.applied_log_index(); + max_seqno = pair.seqno(); + } + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/logindex/src/table_properties.rs` around lines 149 - 153, The current comparison using only pair.applied_log_index() > max_log_index can leave an older seqno when two SSTs share the same log_index; modify the logic in the read_stats_from_table_props handling so that you update max_log_index and max_seqno when the pair's applied_log_index is greater OR when it's equal and pair.seqno() is greater than max_seqno. In other words, replace the strict-">" check with a condition like (pair.applied_log_index() > max_log_index) || (pair.applied_log_index() == max_log_index && pair.seqno() > max_seqno) and assign max_log_index = pair.applied_log_index(); max_seqno = pair.seqno() inside that branch.
133-137:⚠️ Potential issue | 🟡 MinorReject malformed property payloads with extra
/segments.
read_stats_from_table_propscurrently accepts values likea/b/cby ignoring trailing parts.🔒 Proposed parser hardening
let mut parts = s.split('/'); let log_index: LogIndex = parts.next()?.parse().ok()?; let seqno: SequenceNumber = parts.next()?.parse().ok()?; + if parts.next().is_some() { + return None; + } Some(LogIndexAndSequencePair::new(log_index, seqno))🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/logindex/src/table_properties.rs` around lines 133 - 137, The parser in read_stats_from_table_props currently accepts payloads with extra '/' segments (e.g., "a/b/c") because it ignores any parts after parsing log_index and seqno; change the logic so after parsing log_index and seqno (the code creating LogIndexAndSequencePair::new(log_index, seqno)) you explicitly check that there are no remaining segments (e.g., ensure parts.next().is_none()) and return None (reject) if any extra segment exists, so only exactly two '/'-separated components are accepted.src/logindex/src/cf_tracker.rs (1)
131-138:⚠️ Potential issue | 🟠 MajorValidate
cf_idbefore indexingcf[cf_id]in public methods.These paths can panic on invalid input. Add a shared bounds check and return safely for out-of-range IDs.
🧱 Proposed guard pattern
impl LogIndexOfColumnFamilies { + #[inline] + fn valid_cf_id(cf_id: usize) -> bool { + cf_id < COLUMN_FAMILY_COUNT + } + pub fn set_flushed_log_index(&self, cf_id: usize, log_index: LogIndex, seqno: SequenceNumber) { + if !Self::valid_cf_id(cf_id) { + return; + } let cf = self.cf.write(); let li = cf[cf_id].flushed_index.log_index(); let seq = cf[cf_id].flushed_index.seqno(); cf[cf_id] .flushed_index .set(log_index.max(li), seqno.max(seq)); } pub fn is_applied(&self, cf_id: usize, cur_log_index: LogIndex) -> bool { + if !Self::valid_cf_id(cf_id) { + return false; + } cur_log_index < self.cf.read()[cf_id].applied_index.log_index() } pub fn update(&self, cf_id: usize, cur_log_index: LogIndex, cur_seqno: SequenceNumber) { + if !Self::valid_cf_id(cf_id) { + return; + } let cf = self.cf.write(); ... } pub fn get_cf_applied(&self, cf_id: usize) -> (LogIndex, SequenceNumber) { + if !Self::valid_cf_id(cf_id) { + return (0, 0); + } let cf = self.cf.read(); ... } pub fn get_cf_flushed(&self, cf_id: usize) -> (LogIndex, SequenceNumber) { + if !Self::valid_cf_id(cf_id) { + return (0, 0); + } let cf = self.cf.read(); ... } }Also applies to: 159-161, 164-180, 218-233
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/logindex/src/cf_tracker.rs` around lines 131 - 138, Public methods like set_flushed_log_index currently index into the cf vector (via cf.write() and cf[cf_id]) without validating cf_id, which can panic on out-of-range IDs; add a shared bounds check at start of each public method that touches cf (e.g., set_flushed_log_index, the methods around the ranges noted) by acquiring the write/read guard, verifying cf_id < cf.len(), and returning early (or returning a Result/Option if the API prefers) when out of range; update references to cf[cf_id] to use the guarded access after the check so all indexing is safe and non-panicking.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/logindex/Cargo.toml`:
- Around line 13-18: Remove tempfile from the main [dependencies] and add it
under [dev-dependencies], and remove the redundant rocksdb entry from
[dev-dependencies] so rocksdb remains only in [dependencies]; specifically
update Cargo.toml to ensure tempfile is declared only in [dev-dependencies]
(used by #[cfg(test)] code in table_properties.rs, event_listener.rs,
cf_tracker.rs) and rocksdb is declared only in [dependencies] (used by
table_properties.rs, event_listener.rs, db_access.rs, cf_tracker.rs), deleting
the duplicate entries accordingly.
---
Duplicate comments:
In `@src/logindex/src/cf_tracker.rs`:
- Around line 131-138: Public methods like set_flushed_log_index currently index
into the cf vector (via cf.write() and cf[cf_id]) without validating cf_id,
which can panic on out-of-range IDs; add a shared bounds check at start of each
public method that touches cf (e.g., set_flushed_log_index, the methods around
the ranges noted) by acquiring the write/read guard, verifying cf_id < cf.len(),
and returning early (or returning a Result/Option if the API prefers) when out
of range; update references to cf[cf_id] to use the guarded access after the
check so all indexing is safe and non-panicking.
In `@src/logindex/src/collector.rs`:
- Around line 39-44: The calculation of step_length_mask in Collector::new uses
(1u64 << step_length_bit) which is undefined in debug and wraps in release when
step_length_bit >= 64; clamp or bound step_length_bit before shifting (e.g., cap
to 63 via step_length_bit = step_length_bit.min(63) or use
step_length_bit.clamp(0,63)) or use u64::checked_shl and handle None to avoid
the overflow; update the new function (referencing step_length_bit and
step_length_mask) to enforce this bound so the mask computation is safe and
explicit.
In `@src/logindex/src/event_listener.rs`:
- Around line 115-127: The code currently sets manual_flushing_cf via
compare_exchange even when flush_trigger is None, leaving the state claimed and
blocking future manual flushes; change the logic to only claim
manual_flushing_cf when a valid flush_trigger exists (and target_cf <
COLUMN_FAMILY_COUNT) — i.e., check self.flush_trigger.is_some() (and the
target_cf bound) before calling compare_exchange, or if you must set first then
ensure you immediately revert/reset manual_flushing_cf to -1 when flush_trigger
is None or target_cf is out of range; refer to manual_flushing_cf,
flush_trigger, compare_exchange, trigger, target_cf and COLUMN_FAMILY_COUNT to
locate and fix the code.
In `@src/logindex/src/table_properties.rs`:
- Around line 149-153: The current comparison using only
pair.applied_log_index() > max_log_index can leave an older seqno when two SSTs
share the same log_index; modify the logic in the read_stats_from_table_props
handling so that you update max_log_index and max_seqno when the pair's
applied_log_index is greater OR when it's equal and pair.seqno() is greater than
max_seqno. In other words, replace the strict-">" check with a condition like
(pair.applied_log_index() > max_log_index) || (pair.applied_log_index() ==
max_log_index && pair.seqno() > max_seqno) and assign max_log_index =
pair.applied_log_index(); max_seqno = pair.seqno() inside that branch.
- Around line 133-137: The parser in read_stats_from_table_props currently
accepts payloads with extra '/' segments (e.g., "a/b/c") because it ignores any
parts after parsing log_index and seqno; change the logic so after parsing
log_index and seqno (the code creating LogIndexAndSequencePair::new(log_index,
seqno)) you explicitly check that there are no remaining segments (e.g., ensure
parts.next().is_none()) and return None (reject) if any extra segment exists, so
only exactly two '/'-separated components are accepted.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (10)
Cargo.tomlsrc/logindex/Cargo.tomlsrc/logindex/src/cf_tracker.rssrc/logindex/src/collector.rssrc/logindex/src/db_access.rssrc/logindex/src/event_listener.rssrc/logindex/src/lib.rssrc/logindex/src/table_properties.rssrc/logindex/src/types.rssrc/logindex/tests/logindex_integration.rs
🚧 Files skipped from review as they are similar to previous changes (3)
- src/logindex/src/db_access.rs
- src/logindex/tests/logindex_integration.rs
- Cargo.toml
|
ci 问题能解决掉吗? |
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (6)
src/logindex/src/lib.rs (1)
1-34: Duplicate license header.Remove the duplicate license block (Lines 18-34).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/logindex/src/lib.rs` around lines 1 - 34, Remove the duplicated license header block that repeats the Apache 2.0 license text (the second occurrence beginning with "Copyright (c) 2024-present, arana-db Community." and the following Apache license paragraphs) so the file only contains a single license header at the top; simply delete the repeated block (lines 18–34 in the shown diff) leaving the original header intact.src/logindex/src/collector.rs (1)
1-34: Duplicate license header.Remove the duplicate license block (Lines 18-34).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/logindex/src/collector.rs` around lines 1 - 34, Remove the duplicated Apache license header block so only one header remains at the top of collector.rs; delete the second repeated license comment (the entire block starting at the second "Copyright (c) 2024-present, arana-db Community" header) and ensure spacing/formatting after the single remaining header is correct.src/logindex/src/table_properties.rs (1)
1-34: Duplicate license header.Remove the duplicate license block (Lines 18-34).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/logindex/src/table_properties.rs` around lines 1 - 34, Remove the duplicate Apache license header block so only a single license header remains at the top of the file; locate the repeated license text (the second copy of the Apache License v2.0 header) and delete that entire duplicate block, preserving the original first header intact.src/logindex/src/cf_tracker.rs (1)
1-34: Duplicate license header.Remove the duplicate license block (Lines 18-34).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/logindex/src/cf_tracker.rs` around lines 1 - 34, Remove the duplicated Apache license header in src/logindex/src/cf_tracker.rs by deleting the second repeated block (the entire repeated license paragraph that appears after the first header) so only a single license header remains at the top of the file; ensure no other code is altered.src/logindex/src/event_listener.rs (1)
1-34: Duplicate license header.Remove the duplicate license block (Lines 18-34).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/logindex/src/event_listener.rs` around lines 1 - 34, Top of the file contains two identical Apache license headers; remove the second duplicate license block so only a single license header remains at the top of the file, ensuring surrounding spacing is preserved and no other code/comments are removed.src/logindex/src/types.rs (1)
1-34: Duplicate license header.The file contains the same license header twice (Lines 1-17 and Lines 18-34). Remove one copy.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/logindex/src/types.rs` around lines 1 - 34, Remove the duplicated Apache-2.0 license header block found at the top of the file: keep a single copy of the license header (the standard Apache License, Version 2.0 block) and delete the second identical block so only one header remains in types.rs.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/logindex/src/cf_tracker.rs`:
- Around line 217-220: The gap calculation uses raw i64 subtraction with `(last
- first) as u64` which can overflow; change it to perform a safe subtraction
(e.g., use `last.saturating_sub(first)` or use `last.checked_sub(first)` and
handle the None case) and then cast the non-negative result to `u64`; update the
code around the `let first = *s.first().unwrap(); let last = *s.last().unwrap();
(last - first) as u64` usage to use `saturating_sub` or checked logic so the
conversion to `u64` cannot overflow.
In `@src/logindex/src/collector.rs`:
- Around line 99-101: The current check casts smallest_applied_log_index (i64)
to u64 before masking with self.step_length_mask, which misbehaves for negative
values; update the logic to explicitly handle negative indices (e.g.,
early-return or normalize) before casting, or convert using a safe
absolute/unsigned strategy only when smallest_applied_log_index >= 0, and
include a comment documenting the chosen behavior; locate the check around
smallest_applied_log_index and self.step_length_mask and change it so you don't
directly cast negative i64 to u64 (or explicitly assert/handle negatives if they
are invalid).
In `@src/logindex/src/types.rs`:
- Around line 99-102: The current set method (and readers like eq_seqno,
le_seqno, and the Clone impl) perform two separate atomic stores/loads on
self.log_index and self.seqno which can produce torn pairs; make the pair
updates/reads atomic by either packing both values into a single atomic (e.g.,
AtomicU128/AtomicI128) and updating/reading that single word, or replace the two
atomics with a parking_lot::RwLock<(LogIndex, SequenceNumber)> and read/write
the tuple under the lock; update set, the accessors (log_index, seqno),
comparison helpers (eq_seqno, le_seqno, etc.), and Clone to use the chosen
atomic pack or the RwLock so a reader sees a consistent (log_index, seqno)
snapshot.
- Around line 128-132: The PartialEq implementation for LogIndexSeqnoPair
currently uses eq_seqno and thus only compares seqno, ignoring log_index; update
the impl PartialEq for LogIndexSeqnoPair so equality checks both fields (seqno
and log_index) rather than calling eq_seqno, i.e., implement fn eq(&self, other:
&Self) -> bool { self.seqno == other.seqno && self.log_index == other.log_index
} (or, if intended, add clear documentation on LogIndexSeqnoPair::eq_seqno and
the intentional semantics), referencing the LogIndexSeqnoPair type, the impl
PartialEq block, and the eq_seqno helper to locate the code.
---
Nitpick comments:
In `@src/logindex/src/cf_tracker.rs`:
- Around line 1-34: Remove the duplicated Apache license header in
src/logindex/src/cf_tracker.rs by deleting the second repeated block (the entire
repeated license paragraph that appears after the first header) so only a single
license header remains at the top of the file; ensure no other code is altered.
In `@src/logindex/src/collector.rs`:
- Around line 1-34: Remove the duplicated Apache license header block so only
one header remains at the top of collector.rs; delete the second repeated
license comment (the entire block starting at the second "Copyright (c)
2024-present, arana-db Community" header) and ensure spacing/formatting after
the single remaining header is correct.
In `@src/logindex/src/event_listener.rs`:
- Around line 1-34: Top of the file contains two identical Apache license
headers; remove the second duplicate license block so only a single license
header remains at the top of the file, ensuring surrounding spacing is preserved
and no other code/comments are removed.
In `@src/logindex/src/lib.rs`:
- Around line 1-34: Remove the duplicated license header block that repeats the
Apache 2.0 license text (the second occurrence beginning with "Copyright (c)
2024-present, arana-db Community." and the following Apache license paragraphs)
so the file only contains a single license header at the top; simply delete the
repeated block (lines 18–34 in the shown diff) leaving the original header
intact.
In `@src/logindex/src/table_properties.rs`:
- Around line 1-34: Remove the duplicate Apache license header block so only a
single license header remains at the top of the file; locate the repeated
license text (the second copy of the Apache License v2.0 header) and delete that
entire duplicate block, preserving the original first header intact.
In `@src/logindex/src/types.rs`:
- Around line 1-34: Remove the duplicated Apache-2.0 license header block found
at the top of the file: keep a single copy of the license header (the standard
Apache License, Version 2.0 block) and delete the second identical block so only
one header remains in types.rs.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
src/logindex/src/cf_tracker.rssrc/logindex/src/collector.rssrc/logindex/src/db_access.rssrc/logindex/src/event_listener.rssrc/logindex/src/lib.rssrc/logindex/src/table_properties.rssrc/logindex/src/types.rssrc/logindex/tests/logindex_integration.rs
🚧 Files skipped from review as they are similar to previous changes (2)
- src/logindex/tests/logindex_integration.rs
- src/logindex/src/db_access.rs
There was a problem hiding this comment.
🧹 Nitpick comments (5)
src/logindex/src/table_properties.rs (1)
1-34: Duplicate license header.Same issue as other files — the Apache 2.0 license header appears twice. Remove one of the duplicates.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/logindex/src/table_properties.rs` around lines 1 - 34, The file contains a duplicated Apache 2.0 license header at the top; remove the redundant copy so there is only a single license header block. Open src/logindex/src/table_properties.rs, delete the second (or first) repeated license block while preserving one complete, correctly formatted Apache 2.0 header; ensure no other code or comments are removed and that file-level comments remain valid.src/logindex/src/cf_tracker.rs (2)
1-34: Duplicate license header.Same issue as
collector.rs— the Apache 2.0 license header appears twice. Remove one of the duplicates.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/logindex/src/cf_tracker.rs` around lines 1 - 34, The file cf_tracker.rs contains a duplicated Apache-2.0 license header at the top (same as in collector.rs); remove the redundant copy so only a single complete license header remains at the file top, ensuring no other code or comments are removed — look for the repeated Apache 2.0 block near the top of cf_tracker.rs and delete the duplicate block so functions/structs in the file remain unchanged.
106-118: Consider releasing write lock between iterations.The write lock is acquired on line 112 inside the loop but the lock scope extends to line 115. Since each CF is independent, consider restructuring to release the lock between iterations, or document that the current approach is intentional for consistency.
♻️ Restructure to minimize lock hold time
pub fn init<D: DbCfAccess>(&self, db: &D) -> Result<()> { for i in 0..COLUMN_FAMILY_COUNT { let collection = db.get_properties_of_all_tables_cf(i)?; if let Some(pair) = get_largest_log_index_from_collection(&collection) { let log_index = pair.applied_log_index(); let seqno = pair.seqno(); - let cf = self.cf.write(); - cf[i].applied_index.set(log_index, seqno); - cf[i].flushed_index.set(log_index, seqno); + { + let cf = self.cf.write(); + cf[i].applied_index.set(log_index, seqno); + cf[i].flushed_index.set(log_index, seqno); + } } } Ok(()) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/logindex/src/cf_tracker.rs` around lines 106 - 118, The loop in init holds the RwLock write guard across iterations because cf = self.cf.write() is assigned inside the loop but not dropped until after setting both applied_index and flushed_index; change to acquire the write lock only for the minimal scope per column family: for each i, compute collection and pair first (using get_largest_log_index_from_collection), then open a short-lived write guard (e.g., within a block or temporary variable) to set cf[i].applied_index.set(...) and cf[i].flushed_index.set(...), letting the guard drop before the next iteration so the lock is released between iterations; reference symbols: init, COLUMN_FAMILY_COUNT, get_largest_log_index_from_collection, self.cf.write, applied_index, flushed_index.src/logindex/src/lib.rs (1)
1-34: Duplicate license header.Same issue as other files — the Apache 2.0 license header appears twice. Remove one of the duplicates.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/logindex/src/lib.rs` around lines 1 - 34, The file src/logindex/src/lib.rs contains the Apache 2.0 license header duplicated at the top; remove the redundant copy so only a single license header remains (keep one of the existing header blocks intact). Edit lib.rs to delete the second header block while preserving the original copyright and license text and any required spacing/comments.src/logindex/src/collector.rs (1)
1-34: Duplicate license header.The Apache 2.0 license header appears twice (lines 1–17 and 18–34). Remove one of the duplicates.
🧹 Remove duplicate header
// Copyright (c) 2024-present, arana-db Community. All rights reserved. // // Licensed to the Apache Software Foundation (ASF) under one or more // contributor license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright ownership. // The ASF licenses this file to You under the Apache License, Version 2.0 // (the "License"); you may not use this file except in compliance with // the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -// Copyright (c) 2024-present, arana-db Community. All rights reserved. -// -// Licensed to the Apache Software Foundation (ASF) under one or more -// contributor license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright ownership. -// The ASF licenses this file to You under the Apache License, Version 2.0 -// (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the language governing permissions and -// limitations under the License. - use crate::types::{LogIndex, LogIndexAndSequencePair, SequenceNumber};🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/logindex/src/collector.rs` around lines 1 - 34, The top of src/logindex/src/collector.rs contains a duplicated Apache-2.0 license header — remove the redundant copy so only a single license header remains at the file head; ensure you keep one complete header (the standard Apache 2.0 block) and delete the other duplicate, then run rustfmt/checks to confirm no formatting or compile issues from the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@src/logindex/src/cf_tracker.rs`:
- Around line 1-34: The file cf_tracker.rs contains a duplicated Apache-2.0
license header at the top (same as in collector.rs); remove the redundant copy
so only a single complete license header remains at the file top, ensuring no
other code or comments are removed — look for the repeated Apache 2.0 block near
the top of cf_tracker.rs and delete the duplicate block so functions/structs in
the file remain unchanged.
- Around line 106-118: The loop in init holds the RwLock write guard across
iterations because cf = self.cf.write() is assigned inside the loop but not
dropped until after setting both applied_index and flushed_index; change to
acquire the write lock only for the minimal scope per column family: for each i,
compute collection and pair first (using get_largest_log_index_from_collection),
then open a short-lived write guard (e.g., within a block or temporary variable)
to set cf[i].applied_index.set(...) and cf[i].flushed_index.set(...), letting
the guard drop before the next iteration so the lock is released between
iterations; reference symbols: init, COLUMN_FAMILY_COUNT,
get_largest_log_index_from_collection, self.cf.write, applied_index,
flushed_index.
In `@src/logindex/src/collector.rs`:
- Around line 1-34: The top of src/logindex/src/collector.rs contains a
duplicated Apache-2.0 license header — remove the redundant copy so only a
single license header remains at the file head; ensure you keep one complete
header (the standard Apache 2.0 block) and delete the other duplicate, then run
rustfmt/checks to confirm no formatting or compile issues from the change.
In `@src/logindex/src/lib.rs`:
- Around line 1-34: The file src/logindex/src/lib.rs contains the Apache 2.0
license header duplicated at the top; remove the redundant copy so only a single
license header remains (keep one of the existing header blocks intact). Edit
lib.rs to delete the second header block while preserving the original copyright
and license text and any required spacing/comments.
In `@src/logindex/src/table_properties.rs`:
- Around line 1-34: The file contains a duplicated Apache 2.0 license header at
the top; remove the redundant copy so there is only a single license header
block. Open src/logindex/src/table_properties.rs, delete the second (or first)
repeated license block while preserving one complete, correctly formatted Apache
2.0 header; ensure no other code or comments are removed and that file-level
comments remain valid.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (7)
Cargo.tomlsrc/logindex/Cargo.tomlsrc/logindex/src/cf_tracker.rssrc/logindex/src/collector.rssrc/logindex/src/event_listener.rssrc/logindex/src/lib.rssrc/logindex/src/table_properties.rs
🚧 Files skipped from review as they are similar to previous changes (3)
- Cargo.toml
- src/logindex/src/event_listener.rs
- src/logindex/Cargo.toml
Implements complete logindex module with:
supportlogindex
Summary by CodeRabbit